package org.idea.mq.framework.rocketmq.consumer;

import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;


import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Set;

/**
 * @Author linhao
 * @Date created in 5:30 下午 2022/4/20
 */
public class MqConsumerPullApplication {

    public static final String NAME_SERVER = "localhost:9876";

    public static void main(String[] args) throws MQClientException {
        DefaultLitePullConsumer pullConsumer = new DefaultLitePullConsumer("pulConsumerGroup");
        pullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        pullConsumer.subscribe("pull-topic","*");
        pullConsumer.setAutoCommit(true);
        pullConsumer.setNamesrvAddr(NAME_SERVER);
        pullConsumer.start();
        
        while (true) {
            List<MessageExt> messageExtList = pullConsumer.poll();
            for (MessageExt msg : messageExtList) {
                long timeStamp = msg.getStoreTimestamp();
                LocalDateTime localDateTime = new Date(timeStamp).toInstant().atOffset(ZoneOffset.of("+8")).toLocalDateTime();
                System.out.println("broker存储时间:" + localDateTime + " " + new String(msg.getBody()));
            }
        }
        
    }
}
